kafka生产者(producer)—— 向Kafka写入数据

生产者发送消息

向kafka发送消息的主要步骤如下图:

创建ProducerRecord对象,其中包含主题(topic)和内容(value)两个必传参数,也可以添加键(key)和分区(partition)两个非必传参数; 然后在发送ProducerRecord对象,生产者要先把键值对象序列化成字节数组,这样他们才能在网络上传输; 接着数据传入分区器,如果之前在ProducterRecord对象指定了分区,那么分区器直接把指定的分区返回,如果没有指定分区,那么分区器会根据ProducerRecord对象的键来选择一个分区。选好分区后,生产者就知道往哪个主题和分区发送这条记录了; 然后这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上; 有一个独立的线程负责把这些记录批次发送到相应的broker上; 服务器在收到这些消息时会返回一个响应,如果消息成功写入kafka,就返回一个RecordMetaData对象,他包含了主题和分区信息,以及记录在分区里的偏移量,如果写入失败则会返回一个错误; 生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败就返回报错信息。

创建kafka生产者

要往kafka写入消息,首先要创建一个生产者对象,并设置一些属性,kafka生产者有三个必选的属性。 bootstrap.servers 该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有broker地址,生产者会从给定的broker里查找到其他的broker信息。不过建议至少要提供两个broker信息,一旦其中一个宕机,生产者仍然能连上集群。 key.serializer broker希望接受到的消息的键值都是字节数组。生产者接口允许使用参数化类型,因此可以把java对象作为键值发送给broker。这样的代码具有良好的可读性,不过生产者需要知道如何将这些java对象转化为字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serializetion.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组 value.serializer 与key.serialier一样,value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与key.serializer一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。 下面代码片段演示了如何创建一个新的生产者,这里只指定了必要的属性,其他使用默认设置

private Properties kafkaProps = new Properties();

kafkaProps.put("bootstrap.servers","broker1:9092,borker2:9092");

kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String,String>(kafkaProps);

这个接口很简单,通过配置生产者的不同属性就可以很大程度地控制它的行为。 实例化生产者对象后,接下来就是可以开始发送消息了。发行送消息主要有以下三种方式: 发送并忘记(fire-and-forget) 我们把消息发送给服务,但并不关心它是否正常到达,大多数情况下,消息会正常到达,因为kafka是高可用的,而且生产者会自动尝试重发,不过,使用这种方式有时候会丢失一些消息。 同步发送 我们使用send()方法发送消息,他会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功。 异步发送 我们调用send()方法,并指定一个回调函数,服务器在返回响应时调用该函数。 下面会举几个例子,介绍如何使用上述几种方法来发送消息,以及如何处理可能发生的异常情况。 本篇所有的例子都是使用单线程,但其实生产者是可以使用多线程来发送消息的,刚开始的时候可以使用单个消费者和单个线程。如果需要更高的吞吐量,可以再生产者数量不变的前提下增加线程数量,无果这样做还不够,可以增加生产者数量。

发送消息到Kafka

发送并忘记 最简单的消息发送方式如下所示:

ProducerRecord<String,String> record = new ProducerRecord<>("CustomerCountry","Precision Products","France");
try {
    producer.send(record);
} catch(Exception e){
    e.printStackTrace();
}

生产者的send()方法将ProducerRecord对象作为参数,所以首先需要创建一个ProducerRecord对象。ProducerRecord有多个构造函数。这里使用其中一个构造函数,需要目标主题的名字好要发送的键和值对象,他们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。 producer的send()方法发送ProducerRecord对象,从生产者的架构图里可以看到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。send()方法会返回一个包含RecordMetadata的Future对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录不太重要的应用程序日志。 虽然忽略了发消息时或者服务端可能发生的错误,但是发送消息之前,生产者还是有可能发生其他异常。这些异常有可能是SerializationException(说明序列化消息失败),BufferExhaustedException或TimeoutException(说明缓冲区已满),又或者是InterruptException(说明发送线程被中断)。 同步发送消息 最简单的同步发送消息方式如下所示:

ProducerRecord<String,String> record = new ProducerRecord<>("CustomerCountry","Precision Products","France");
try{
    producer.send(record).get();
}catch(Exception e){
    e.printStackTrace();
}

producer.send()方法返回一个Future对象,然后调用Future对象的get()方法等待Kafka响应。如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量。 如果在发送数据之前或者在发送过程中发生了任何错误,比如broker返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常被catch捕获。 KafkaProducer一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProduer可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KakfaPrducer不会进行任何重试,直接抛出异常。 异步发送消息 假设消息在应用程序和Kafka集群之间一个来回需要10ms。如果在发送完每个消息后都等待回应,那么发送100个消息就需要1秒。但是如果只发送消息而不等待回应,那么发送你个100个消息所需要的时间会少很多。大多数时候,我们并不需要等待响应,尽管Kafka会把目标主题,分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志或者把消息写入“错误消息”文件以便日后分析。 为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。下面使用回调的一个例子:

private class DemoProducerCallback implements Callback{
    @Override
    public void onCompletion(RecordMetadata recordMetadata,Exception e){
        if(e != null){
            e.printStackTrace();
        }
    }
    ProducerRecord<String,String> record = new ProducerRecord<>("CustomerCountry","Biomedical Materials","USA");
    producer.send(record,new DemoProducerCallback());
}

为了使用回调,需要一个实现了org.apache.kafka.clients.producer.Callback接口的类,这个接口只有一个onCompletion方法。 如果Kafka返回一个错误,onCompletion方法会抛出一个非空(non null)异常。这里我们只是简单地打印出来,但是生产环境应该有更好的处理方式。 在producer.send()发送消息时传入一个回调对象。

生产者的配置

之前介绍了生产者的几个必要的配置参数,接着说说生产者其他很多可配置的参数,在Kafka文档里面都有说明,它们大部分都有很合理的默认值,所以没有必要去修改它们。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,下面介绍一下这些配置参数: acks acks参数指定了必须要有多少个分区副本接受消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项: 如果acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。 如果acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过如果一个没有收到消息的节点成为首领,消息还是会丢失。这个时候的吞吐量取决于会用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用Future对象的get()方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如生产者在收到服务器响应之前可以发送多少个消息)。 如果acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证布置一个服务器收到消息,就算有服务发生崩溃,整个集群仍然可以运行。不过,他的延迟比acks=1时更高,因为我们要等待不止一个服务器节点接收消息。 buffer.memory 该参数用于设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候,send()方法调用要么被阻塞,要么抛出异常,取决于如何设置block.on.buffer.full参数(在0.9.0.0版本里被替换成了max.block.ms,表示在抛出异常之前可以阻塞一段时间)。 compression.type 默认情况下,消息发送时不会被压缩。该参数可以设置为snappy、gzip或lz4,它指定了消息被发送给broker之前使用哪一种压缩算法进行压缩。snappy压缩算法由谷歌发明,它占用较少的cpu,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip压缩算法一般会占用较多的cpu,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向kafka发送消息的瓶颈所在。 retries 生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待100ms,不过可以通过retry.backoff.ms参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比kafka集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过这些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。 batch.size 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只会占用更多内存而已。但是如果设置得太小,因为生产者需要更加频繁地发送消息,会增加一些额外的开销。 linger.ms 该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer会在批次填满或者linger.ms达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把linger.ms设置成比0大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。 client.id 该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。 max.in.flight.requests.per.connection 该参数指定了生产者在收到服务器响应之前可以发送多少个消息。他的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设置为1可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。 timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或者执行回调)。timeout.ms指定了broker等待同步副本返回消息确认的时间,与asks的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么boker就会返回一个错误。 max.block.ms 该参数指定了在调用send()方法或使用partitionFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到max.block.ms时,生产者会抛出超时异常。 max.request.size 该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为1MB,那么可以发送的单个最大消息为1M,或者生产者可以在单个请求里发送一个批次,该批次包含了1000个消息,每个消息大小为1KB。另外,broker对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被broker拒绝。 receive.buffer.bytes和send.buffer.bytes 这两个参数分别指定了TCP socket接收和发送数据包的缓冲区大小。如果它们被设为-1,就使用操作系统的默认值。如果生产者和消费者与broker处于不同的数据中心,那么可以适当地增大这些值,因为跨数据中心的网络一般都会有比较高的延迟和比较低的带宽。

顺序保证

Kafka可以保证同一个分区里的消息是有序的,也就是说,如果生产者按照一定的顺序发送消息,broker就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某种情况下。顺序是非常重要的。例如,往一个账户存入100元再取出来,这个与先取钱再存钱是截然不同的,不过,有些场景对顺序的要求不是很敏感。 如果把retries设置为非零整数,同时把max.in.flight.requests.per.connection设为比1大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker会重试写入第一个批次。如果此时第一个批次也写入成功,那么这两个批次的顺序就反过来了。 一般来说,如果某些场景要求消息时有序的,那么消息是否写入成功也是很关键的,所以不建议把retries设为0.可以把max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

序列化器

创建一个生产者对象必须制定序列化器。前文演示过如何使用默认的字符串序列化器,kafka还提供了整型和字节数组序列化器,不过它们还不足以满足大部分场景的需求。到最后,我们需要序列化的记录类型会越来越多。接下来演示如何开发自己的序列化器,并介绍Avro序列化器作为推荐的备选方案。 自定义序列化器 如果发送到Kafka的对象不是简单的字符串或者整型,那么可以使用序列化框架来创建消息记录,比如Avro、Thrift或者Protobuf,或者使用自定义序列化器。强烈建议使用通用的序列化框架。不过为了了解序列化器的工作原理,也为了说明为什么要使用序列化框架,下面来看看如何自定义一个序列化器。 假设你创建了一个简单的类来表示一个客户:

public class Customer {
    private int customerId;
    private String customerName;

    public Customer(int customerId,String customerName){
        this.id = customerId;
        this.name = customerName;
    }
    public int getCustomerId(){
        return customerId;
    }
    public String getCustomerName(){
        return customerName;
    }
}

现在我们要为这个类创建一个序列化器,它看起来可能是这样的:

import org.apache.kafka.common.errors.SerializationException;

import java.nio.ByteBuffer;
import java.util.Map;

public class CustomerSerializer implements Serializer<Customer>{

    @Override
    public void configure(Map configs,boolean isKey){
        //不做任何配置
    }

    /**
    *Customer对象被序列化成:
    *   表示id的4字节整数
    *   表示name长度的4字节整数(如果name为空,则长度为0)
    *   表示name的N个字节
    */
    @Override
    public byte[] serialize(String topic,Customer data){
        try{
            byte[] serializedName;
            int stringSize;
            if(data == null){
                return null
            }else{
                if(data.getName != null){
                    serializedName = data.getCustomerName().getBytes("utf-8");
                    stringSize = serializedName.length;
                }else{
                    serializedName = new bytes[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getCustomerId());
            buffer.putInt(stringSize);
            buffer.put(serializedName);

            return buffer.array();
        }catch(Exception e){
            throw new SerializationException("Error when serializing Customer to byte[]" + e);
        }
    }

    @Override
    public void close(){
        //不需要关闭任何东西
    }

}

只要使用这个CustomerSerializer,就可以把消息记录定义成ProducerRecord<String,Customer>,并且可以直接把Customer对象传给生产者。这个例子很简单,不过代码看起来很脆弱,如果有多种类型的消费者,可能需要把customerId字段变成长整型,或者为Customer添加startDate字段,这样就会出现新旧消息兼容性的问题。在不同版本的序列化器和反序列化器之间调试兼容性问题着实是个挑战。更糟糕的是如果同一个公司的不同团队都往Kafka写入Customer数据,那么他们就需要使用相同的序列化器,如果序列化器发生改动,他们几乎要在同一时间修改代码。 基于以上原因,所以强烈不建议使用自定义序列化器,推荐直接使用StringSerializer和StringDeserializer,然后使用json作为标准的数据传输格式

分区

在上文书,ProducerRecord对象包含了目标主题、键和值。Kafka的消息时一个个键值对,ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的null,不过大多数应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。也就是说,如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取。要创建一个包含键值的记录,只需要下面这样创建ProducerRecord对象:

ProducerRecord<Integer,String> record = new ProducerRecord<>("CustomerCountry","Laboratory Equipment","USA");

如果要创建键为null的消息,不指定键就可以了:

ProducerRecord<Integer,String> record = new ProducerRecord<>("CustomerCountry","USA");

如果键值为null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。 如果键不为空,并且使用了默认的分区器,那么Kafka会对键进行散列,然后根据散列值把消息映射到特定的分区上,这里的关键之处在于,同一个键总会被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区时不可用的,那么就会发生错误。但是这种情况很少发生。 只有在不改变分区数量的情况下,键与分区之间的映射才能保持不变。比如,在分区数量保持不变的情况下,可以保证用户12138的记录总是被写到分区3中。在从分区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证了,旧数据仍然在分区3上,但是新的数据有可能被写到其他分区上了,如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好,而且永远不要增加新的分区。 实现自定义分区策略 除了散列分区之外,有时候也需要对数据进行不一样的分区,假设你是一个B2B供应商,你有一个大客户,他是手持设备Banana的供应商。Banana占据了你的整体业务的10%的份额。如果使用默认的散列分区算法,Banana的账号记录将和其他账号记录一起被分配给相同的分区,导致这个分区比其他分区要大一些。服务器有可能因此出现存储空间不足、处理缓慢等问题。我们需要给Banana分配单独的分区,然后使用散列分区算法处理其他账号。 下面举一个自定义分区器的例子:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

/**
*Partitioner接口包含了configure、partition和close这三个方法
*不过此处只是作为演示的硬代码,仅供学习,“Banana”应该通过configure方法传入
*/
public class BananaPartitioner implements Partitioner {
    public void configure(Map<String,?> configs){

    }

    public int partition(String topic,Object key,byte[] keyBytes,Object value,byte[] valueBytes,Cluster cluster){
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if((keyBytes == null)||(!(key instanceOf String))){
            throw new InvalidRecordException("We expect all messages to have customer name as key");
        }

        if(((String)key).equals("Banana")){
            //Banana总是被分配到最后一个分区
            return numPartitions;
        }

        return(Math.abs(Utils.murmur2(keyBytes))%(numPartitions - 1));
    }

    public void close(){

    }

}